# 4.2 搭建第一个 MCP 服务器

**第4周 | 第2课 | 从零构建 MCP Server | 预计时长：45分钟**

---

## 学习目标

完成本课后，你将能够：

- 安装 MCP Python SDK 并创建项目
- 使用 `@server.tool()` 装饰器编写工具
- 通过 stdio 传输启动 MCP Server
- 编写 MCP Client 连接 Server 并调用工具
- 完整运行"Server + Client"的交互流程

---

## 1. 环境准备

### 1.1 安装依赖

```bash
# 创建项目目录
mkdir mcp-demo && cd mcp-demo

# 创建虚拟环境（推荐）
python -m venv .venv

# Windows 激活
.venv\Scripts\activate
# macOS/Linux 激活
# source .venv/bin/activate

# 安装 MCP SDK
pip install mcp

# 验证安装
python -c "import mcp; print(mcp.__version__)"
```

### 1.2 项目结构

```
mcp-demo/
├── server.py          # MCP Server（工具提供者）
├── client.py          # MCP Client（Agent 侧）
└── requirements.txt   # 依赖清单
```

```txt
# requirements.txt
mcp>=1.0.0
```

---

## 2. 编写第一个 MCP Server

我们将创建一个"助手 Server"，提供 3 个工具：

| 工具 | 功能 | 参数 |
|------|------|------|
| `calculator` | 数学运算 | operation, a, b |
| `get_time` | 查询当前时间 | timezone |
| `get_weather` | 查询天气（模拟） | city |

### 2.1 完整 Server 代码

```python
# server.py
"""
第一个 MCP Server：提供计算器、时间查询、天气查询工具
"""
from datetime import datetime
from mcp.server.fastmcp import FastMCP

# 创建 Server 实例
mcp = FastMCP(
    name="assistant-tools",
    description="助手工具集：计算器、时间、天气",
)


# ==================== 工具 1：计算器 ====================

@mcp.tool()
def calculator(operation: str, a: float, b: float) -> str:
    """
    执行基础数学运算。

    Args:
        operation: 运算类型，支持 add(加), subtract(减), multiply(乘), divide(除)
        a: 第一个数字
        b: 第二个数字

    Returns:
        格式化的计算结果字符串
    """
    ops = {
        "add": lambda x, y: x + y,
        "subtract": lambda x, y: x - y,
        "multiply": lambda x, y: x * y,
        "divide": lambda x, y: x / y if y != 0 else "错误：除数不能为零",
    }

    # 参数校验
    if operation not in ops:
        return f"错误：不支持的运算类型 '{operation}'，支持：{', '.join(ops.keys())}"

    result = ops[operation](a, b)
    op_symbols = {"add": "+", "subtract": "-", "multiply": "×", "divide": "÷"}

    return f"{a} {op_symbols[operation]} {b} = {result}"


# ==================== 工具 2：时间查询 ====================

@mcp.tool()
def get_time(timezone: str = "Asia/Shanghai") -> str:
    """
    获取指定时区的当前日期和时间。

    Args:
        timezone: IANA 时区名称，默认 Asia/Shanghai（北京时间）

    Returns:
        格式化的日期时间字符串
    """
    # 常见时区映射
    tz_map = {
        "Asia/Shanghai": 8,
        "Asia/Tokyo": 9,
        "America/New_York": -5,
        "America/Los_Angeles": -8,
        "Europe/London": 0,
        "UTC": 0,
    }

    if timezone not in tz_map:
        return (
            f"错误：不支持的时区 '{timezone}'。\n"
            f"支持的时区：{', '.join(tz_map.keys())}"
        )

    now = datetime.now()
    # 简单时区偏移（生产环境应使用 pytz 或 zoneinfo）
    utc_offset = tz_map[timezone]
    from datetime import timedelta, timezone as tz
    tz_info = tz(timedelta(hours=utc_offset))
    local_time = datetime.now(tz_info)

    return (
        f"当前时间（{timezone}）：\n"
        f"  日期：{local_time.strftime('%Y年%m月%d日')}\n"
        f"  时间：{local_time.strftime('%H:%M:%S')}\n"
        f"  星期：{local_time.strftime('%A')}"
    )


# ==================== 工具 3：天气查询（模拟） ====================

# 模拟天气数据
_WEATHER_DATA = {
    "北京": {"天气": "晴", "温度": "18°C ~ 28°C", "湿度": "45%", "风力": "北风2级"},
    "上海": {"天气": "多云", "温度": "20°C ~ 26°C", "湿度": "65%", "风力": "东南风3级"},
    "广州": {"天气": "雷阵雨", "温度": "25°C ~ 32°C", "湿度": "80%", "风力": "南风4级"},
    "深圳": {"天气": "阵雨", "温度": "24°C ~ 30°C", "湿度": "75%", "风力": "西南风3级"},
    "成都": {"天气": "阴", "温度": "16°C ~ 22°C", "湿度": "70%", "风力": "北风1级"},
    "杭州": {"天气": "小雨", "温度": "18°C ~ 24°C", "湿度": "85%", "风力": "东风2级"},
}


@mcp.tool()
def get_weather(city: str) -> str:
    """
    查询指定城市的天气信息。

    Args:
        city: 城市名称，如 北京、上海、广州 等

    Returns:
        格式化的天气信息
    """
    if city not in _WEATHER_DATA:
        return (
            f"未找到 '{city}' 的天气信息。\n"
            f"支持的城市：{', '.join(_WEATHER_DATA.keys())}"
        )

    w = _WEATHER_DATA[city]
    return (
        f"{city} 天气：\n"
        f"  天气状况：{w['天气']}\n"
        f"  温度范围：{w['温度']}\n"
        f"  湿度：{w['湿度']}\n"
        f"  风力：{w['风力']}"
    )


# ==================== 启动 Server ====================

if __name__ == "__main__":
    # 使用 stdio 传输启动 Server
    # Agent 将通过标准输入输出与此 Server 通信
    mcp.run()
```

### 2.2 运行 Server

```bash
python server.py
```

Server 启动后会在后台等待 Client 连接。你不会有太多视觉反馈，因为 stdio 传输是无 UI 的。

**验证 Server 正常**：新开一个终端，用简单的 Python 脚本测试：

```python
# quick_test.py — 快速验证 Server 是否正常
import subprocess
import json

# 启动 Server 进程
proc = subprocess.Popen(
    ["python", "server.py"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True,
)

# 发送 initialize 请求
init_request = json.dumps({
    "jsonrpc": "2.0",
    "id": 0,
    "method": "initialize",
    "params": {
        "protocolVersion": "2024-11-05",
        "capabilities": {},
        "clientInfo": {"name": "test", "version": "1.0"},
    },
}) + "\n"
proc.stdin.write(init_request)
proc.stdin.flush()

# 读取响应
response = proc.stdout.readline()
data = json.loads(response)
print(f"初始化成功: {data.get('result', {}).get('serverInfo', {})}")

proc.terminate()
```

---

## 3. 编写 MCP Client

Client 是 Agent 侧的代码，负责连接 Server、发现工具、调用工具。

### 3.1 完整 Client 代码

```python
# client.py
"""
MCP Client：连接 Server 并调用工具
模拟 Agent 与 MCP Server 的交互流程
"""
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


async def main():
    """主流程：连接 Server → 列出工具 → 调用工具"""

    # 步骤 1：配置 Server 参数
    # command 和 args 告诉 Client 如何启动 Server
    server_params = StdioServerParameters(
        command="python",           # 运行命令
        args=["server.py"],         # Server 脚本
        env=None,                   # 额外环境变量（可选）
    )

    # 步骤 2：建立连接
    async with stdio_client(server_params) as (read, write):
        # read 和 write 是异步流，用于与 Server 通信
        async with ClientSession(read, write) as session:
            # 步骤 3：初始化会话
            # 这一步交换双方能力信息，是 MCP 协议的握手过程
            await session.initialize()
            print("✅ 已连接到 MCP Server\n")

            # 步骤 4：列出可用工具
            tools_result = await session.list_tools()
            print("=" * 50)
            print("📋 可用工具列表：")
            print("=" * 50)
            for tool in tools_result.tools:
                print(f"\n工具名：{tool.name}")
                print(f"描述：{tool.description}")
                if hasattr(tool, 'inputSchema') and tool.inputSchema:
                    print(f"参数：{tool.inputSchema}")
            print("\n")

            # 步骤 5：调用工具 — 计算器
            print("=" * 50)
            print("🔧 调用 calculator 工具：15 × 8")
            print("=" * 50)
            result = await session.call_tool(
                "calculator",
                {"operation": "multiply", "a": 15, "b": 8}
            )
            for content in result.content:
                print(content.text)
            print()

            # 步骤 6：调用工具 — 时间查询
            print("=" * 50)
            print("🔧 调用 get_time 工具：东京时间")
            print("=" * 50)
            result = await session.call_tool(
                "get_time",
                {"timezone": "Asia/Tokyo"}
            )
            for content in result.content:
                print(content.text)
            print()

            # 步骤 7：调用工具 — 天气查询
            print("=" * 50)
            print("🔧 调用 get_weather 工具：北京")
            print("=" * 50)
            result = await session.call_tool(
                "get_weather",
                {"city": "北京"}
            )
            for content in result.content:
                print(content.text)
            print()

            # 步骤 8：测试错误处理
            print("=" * 50)
            print("🔧 测试错误：不支持的城市 '纽约'")
            print("=" * 50)
            result = await session.call_tool(
                "get_weather",
                {"city": "纽约"}
            )
            for content in result.content:
                print(content.text)
            print()

            # 步骤 9：测试错误处理 — 除以零
            print("=" * 50)
            print("🔧 测试错误：除以零")
            print("=" * 50)
            result = await session.call_tool(
                "calculator",
                {"operation": "divide", "a": 10, "b": 0}
            )
            for content in result.content:
                print(content.text)
            print()

    print("✅ 所有调用完成，连接已关闭")


if __name__ == "__main__":
    asyncio.run(main())
```

### 3.2 运行 Client

```bash
python client.py
```

**预期输出**：

```
✅ 已连接到 MCP Server

==================================================
📋 可用工具列表：
==================================================

工具名：calculator
描述：执行基础数学运算。
参数：{'type': 'object', 'properties': {'operation': {'type': 'string'}, 'a': {'type': 'number'}, 'b': {'type': 'number'}}, 'required': ['operation', 'a', 'b']}

工具名：get_time
描述：获取指定时区的当前日期和时间。
参数：{'type': 'object', 'properties': {'timezone': {'type': 'string'}}, 'required': []}

工具名：get_weather
描述：查询指定城市的天气信息。
参数：{'type': 'object', 'properties': {'city': {'type': 'string'}}, 'required': ['city']}

==================================================
🔧 调用 calculator 工具：15 × 8
==================================================
15.0 × 8.0 = 120.0

==================================================
🔧 调用 get_time 工具：东京时间
==================================================
当前时间（Asia/Tokyo）：
  日期：2025年05月13日
  时间：15:30:45
  星期：Tuesday

==================================================
🔧 调用 get_weather 工具：北京
==================================================
北京 天气：
  天气状况：晴
  温度范围：18°C ~ 28°C
  湿度：45%
  风力：北风2级

==================================================
🔧 测试错误：不支持的城市 '纽约'
==================================================
未找到 '纽约' 的天气信息。
支持的城市：北京, 上海, 广州, 深圳, 成都, 杭州

==================================================
🔧 测试错误：除以零
==================================================
错误：除数不能为零

✅ 所有调用完成，连接已关闭
```

---

## 4. 深入理解：工具装饰器的高级用法

### 4.1 使用 Pydantic 模型定义参数

当参数比较复杂时，用 Pydantic 模型比纯字典更清晰：

```python
from pydantic import BaseModel, Field
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("advanced-server")


class SearchParams(BaseModel):
    """搜索参数模型"""
    query: str = Field(description="搜索关键词")
    max_results: int = Field(default=10, ge=1, le=100, description="最大返回数量")
    sort_by: str = Field(default="relevance", description="排序方式")


@mcp.tool()
def search(params: SearchParams) -> str:
    """执行高级搜索"""
    return (
        f"搜索关键词：{params.query}\n"
        f"最大结果：{params.max_results}\n"
        f"排序方式：{params.sort_by}\n"
        f"（这里是模拟搜索结果）"
    )
```

### 4.2 工具返回多种内容类型

工具不仅可以返回文本，还能返回图像、资源引用等：

```python
from mcp.server.fastmcp import FastMCP
from mcp.types import TextContent, ImageContent

mcp = FastMCP("multi-content")


@mcp.tool()
def generate_chart(data_type: str) -> list:
    """
    生成数据图表。

    Returns:
        包含文本描述和图像的列表
    """
    return [
        TextContent(type="text", text=f"已生成 {data_type} 图表"),
        # 实际项目中这里可以返回 Base64 编码的图像
        # ImageContent(type="image", data="base64...", mimeType="image/png")
    ]
```

### 4.3 带进度的长时间运行工具

```python
from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("long-running")


@mcp.tool()
async def process_data(file_path: str, ctx: Context) -> str:
    """
    处理数据文件（可能需要较长时间）。

    Args:
        file_path: 要处理的文件路径
        ctx: MCP 上下文，用于发送进度信息
    """
    # 向 Client 发送进度更新
    await ctx.report_progress(0, 100)
    await ctx.info("开始读取文件...")

    # 模拟处理步骤
    await ctx.report_progress(25, 100)
    await ctx.info("正在解析数据...")

    await ctx.report_progress(75, 100)
    await ctx.info("正在生成报告...")

    await ctx.report_progress(100, 100)
    await ctx.info("处理完成！")

    return f"文件 {file_path} 处理完成，共处理 10,000 条记录"
```

---

## 5. 与 LLM 结合：让 Agent 自主选择工具

前面的例子是直接调用工具。在实际应用中，我们让 LLM 决定调用哪个工具：

```python
# agent.py — 让 LLM 通过 MCP 选择工具
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


# 模拟 LLM 响应（实际应用中调用 OpenAI / Claude API）
def mock_llm_with_tools(user_query: str, tools: list) -> dict:
    """
    模拟 LLM 的工具选择逻辑。
    实际项目中替换为真实的 LLM API 调用。
    """
    # 这里是简化的规则引擎，真实场景 LLM 会自己判断
    if "天气" in user_query:
        return {
            "tool": "get_weather",
            "arguments": {"city": user_query.split("天气")[0].strip() or "北京"},
        }
    elif "时间" in user_query or "几点" in user_query:
        return {"tool": "get_time", "arguments": {"timezone": "Asia/Shanghai"}}
    elif any(op in user_query for op in ["加", "减", "乘", "除", "+", "-", "*", "/"]):
        return {
            "tool": "calculator",
            "arguments": {"operation": "add", "a": 10, "b": 20},  # 简化处理
        }
    else:
        return None


async def agent_main(user_query: str):
    """Agent 主流程：接收用户输入 → LLM 选择工具 → MCP 调用"""

    server_params = StdioServerParameters(command="python", args=["server.py"])

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # 获取工具列表（传给 LLM 做选择）
            tools_result = await session.list_tools()
            tools = [
                {
                    "name": t.name,
                    "description": t.description,
                    "parameters": t.inputSchema,
                }
                for t in tools_result.tools
            ]

            # LLM 决定使用哪个工具
            action = mock_llm_with_tools(user_query, tools)

            if action:
                print(f"用户问题：{user_query}")
                print(f"LLM 选择工具：{action['tool']}")
                print(f"参数：{action['arguments']}")
                print()

                # 通过 MCP 调用工具
                result = await session.call_tool(
                    action["tool"], action["arguments"]
                )
                print("工具返回：")
                for content in result.content:
                    print(content.text)
            else:
                print(f"用户问题：{user_query}")
                print("LLM 认为不需要调用工具，直接回答。")


if __name__ == "__main__":
    # 测试几个不同的用户输入
    asyncio.run(agent_main("北京天气怎么样？"))
    print("---")
    asyncio.run(agent_main("现在几点了？"))
    print("---")
    asyncio.run(agent_main("你好，你是谁？"))
```

---

## 动手练习

### 练习 1：添加新工具

在 `server.py` 中添加一个新工具 `convert_currency`（货币换算）：

```python
# 汇率数据（简化版）
_RATES = {
    "USD_CNY": 7.24,
    "EUR_CNY": 7.85,
    "JPY_CNY": 0.048,
    "GBP_CNY": 9.15,
    "CNY_USD": 0.138,
}

@mcp.tool()
def convert_currency(amount: float, from_currency: str, to_currency: str) -> str:
    """
    货币换算。

    Args:
        amount: 金额
        from_currency: 源货币代码（USD, EUR, JPY, GBP, CNY）
        to_currency: 目标货币代码

    Returns:
        换算结果
    """
    key = f"{from_currency.upper()}_{to_currency.upper()}"
    if key not in _RATES:
        return f"错误：不支持 {from_currency} → {to_currency} 的换算"
    result = amount * _RATES[key]
    return f"{amount} {from_currency} = {result:.2f} {to_currency}"
```

用 Client 调用它验证正确性。

### 练习 2：改用 HTTP+SSE 传输

将 Server 改为 HTTP 模式运行：

```python
# server_http.py
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("assistant-tools")

# ... 同样的工具定义 ...

if __name__ == "__main__":
    # 使用 transport="sse" 启用 HTTP 模式
    mcp.run(transport="sse")
```

```bash
# 启动 HTTP Server（默认端口 8000）
python server_http.py

# 另一个终端，Client 通过 HTTP 连接
# 需要修改 client.py 使用 sse_client 替代 stdio_client
```

### 练习 3：工具组合调用

修改 Client，实现一个"旅行规划"场景：先查询目的地天气，再查询时间，最后做预算计算（用 calculator）。展示多次工具调用的完整流程。

---

## 本课总结

- **MCP Server 用装饰器定义工具**：`@mcp.tool()` 标记函数即为可用工具
- **参数和返回值**：函数签名自动生成 JSON Schema，LLM 据此决定如何调用
- **两种传输方式**：stdio（本地子进程）和 HTTP+SSE（网络服务）
- **Client 交互流程**：连接 → initialize → list_tools → call_tool
- **错误处理在工具内部完成**：返回友好的错误消息，而非抛出异常
- **与 LLM 结合**：工具列表传给 LLM，LLM 自主选择，MCP 执行调用

---

## 下一课预告

下一课我们将接入**真实数据源**：用 MCP 连接 SQLite 数据库查询用户和订单，以及调用外部 HTTP API 获取实时天气数据。让你的 Agent 真正"活"起来！
